package cn.uncode.rpc.transport.support;

import java.util.Collections;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import cn.uncode.rpc.common.CommonConstant;
import cn.uncode.rpc.common.log.Logger;
import cn.uncode.rpc.common.log.LoggerFactory;
import cn.uncode.rpc.core.URL;
import cn.uncode.rpc.core.URLParam;
import cn.uncode.rpc.exception.TransportException;
import cn.uncode.rpc.spi.ExtensionLoader;
import cn.uncode.rpc.transport.Client;
import cn.uncode.rpc.transport.HeartBeatManager;
import cn.uncode.rpc.transport.HeartbeatFactory;

public class DefaultHeartBeatManager implements HeartBeatManager {
	
	private static final Logger LOGGER = LoggerFactory.getLogger(DefaultHeartBeatManager.class);
	
	 private ConcurrentMap<Client, HeartbeatFactory> endpoints = new ConcurrentHashMap<Client, HeartbeatFactory>();

	 // 一般这个类创建的实例会比较少，如果共享的话，容易“被影响”，如果某个任务阻塞了
	    private ScheduledExecutorService executorService = null;

	    @Override
	    public void init() {
	    	//todo 
	        executorService = Executors.newScheduledThreadPool(1);
	        executorService.scheduleWithFixedDelay(new Runnable() {
	            @Override
	            public void run() {
	                for (Map.Entry<Client, HeartbeatFactory> entry : endpoints.entrySet()) {
	                    Client endpoint = entry.getKey();

	                    try {
	                        // 如果节点是存活状态，那么没必要走心跳
	                        if (endpoint.isAvailable()) {
	                            continue;
	                        }

	                        HeartbeatFactory factory = entry.getValue();
	                        endpoint.heartbeat(factory.createRequest());
	                    } catch (Exception e) {
	                    	LOGGER.error("HeartbeatEndpointManager send heartbeat Error: url=" + endpoint.getUrl().getUri(), e);
	                    }
	                }

	            }
	        }, CommonConstant.HEARTBEAT_PERIOD, CommonConstant.HEARTBEAT_PERIOD, TimeUnit.MILLISECONDS);
	    }

	@Override
	public void destroy() {
		executorService.shutdownNow();
	}
	
	@Override
    public void addClient(Client client) {
		
        URL url = client.getUrl();

        String heartbeatFactoryName = url.getParameter(URLParam.HEARTBEAT_FACTORY.getName(), URLParam.HEARTBEAT_FACTORY.getValue());

        HeartbeatFactory heartbeatFactory = ExtensionLoader.getExtensionLoader(HeartbeatFactory.class).getExtension(heartbeatFactoryName);

        if (heartbeatFactory == null) {
            throw new TransportException("HeartbeatFactory not exist: " + heartbeatFactoryName);
        }

        endpoints.put(client, heartbeatFactory);
    }

    @Override
    public void removeClient(Client endpoint) {
        endpoints.remove(endpoint);
    }

    public Set<Client> getClients() {
        return Collections.unmodifiableSet(endpoints.keySet());
    }

}
